package cn.hetra.hj212.client;

import cn.hetra.hj212.core.HJ212Data;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;

public class ReactorNettyTcpConnection implements TcpConnection {

	private final NettyInbound inbound;

	private final NettyOutbound outbound;

	private final Sinks.Empty<Void> completionSink;


	public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound, Sinks.Empty<Void> completionSink) {

		this.inbound = inbound;
		this.outbound = outbound;
		this.completionSink = completionSink;
	}
	@Override
	public ListenableFuture<Void> send(HJ212Data message) {
		Mono<Void> sendCompletion = this.outbound.sendObject(Mono.just(message)).then();
		return new MonoToListenableFutureAdapter<>(sendCompletion);
	}

	@Override
	public void onReadInactivity(Runnable runnable, long inactivityDuration) {
		this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable));
	}

	@Override
	public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
		this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable));
	}

	@Override
	public void close() {
		// Ignore result: concurrent attempts to complete are ok
		this.completionSink.tryEmitEmpty();
	}

}
